package com.adu.music.proxy;

import com.adu.music.bean.Proxy;
import com.adu.music.db.ProxyDao;
import org.apache.http.annotation.GuardedBy;
import org.apache.http.annotation.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * @author duchuanchuan
 * @date 2017/1/6
 */
@ThreadSafe
public class UnBlockedProxyPool extends AbstractProxyPool {

    private Logger logger = LoggerFactory.getLogger(UnBlockedProxyPool.class);

    private BlockingQueue<Proxy> proxyQueue;
    // 线程池容量
    @GuardedBy("this")
    private int capacity;
    private int threshold;
    private volatile boolean shutdownCalled;
    private ExecutorService executor = Executors.newCachedThreadPool();
    private ProxyDao proxyDao = ProxyDao.getInstance();

    UnBlockedProxyPool(int capacity) {
        this.capacity = capacity;
        this.proxyQueue = new LinkedBlockingDeque<>(capacity);
        this.shutdownCalled = false;
        this.initializeProxyQueue();
        this.threshold = capacity / 2;
    }

    UnBlockedProxyPool() {
        this(100);
    }

    private void initializeProxyQueue() {
        List<Proxy> proxyList = proxyDao.queryByIndex(0, capacity);
        proxyList.forEach(proxy -> {
            try {
                proxyQueue.put(proxy);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    @Override
    public Proxy getProxy() {
        if (shutdownCalled)
            throw new IllegalArgumentException("代理池已经关闭");
        if (proxyQueue.size() < threshold) {
            // 补充新的代理
            fillProxyQueue();
        }
        try {
            Proxy proxy = proxyQueue.take();
            if(proxy.isChecked() && proxy.isAvailable()) {
                return proxy;
            } else{
                while (true){
                    if(ProxyChecker.available(proxy)){
                        proxy.setChecked(true);
                        break;
                    }else {
                        proxy.setUnavailable();
                        release(proxy);
                        if(proxyQueue.size() == 0){
                            fillProxyQueue();
                        }
                        // 重新获取
                        proxy = proxyQueue.take();
                    }
                }
            }
            return proxy;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return null;
    }

    private void fillProxyQueue() {
        int index = 0;
        while (proxyQueue.size() < capacity) {
            List<Proxy> proxies = proxyDao.queryByIndex(index * capacity, capacity);
            if (proxies.size() == 0) break;
            proxies.forEach(proxy -> {
                if (!exists(proxy)) {
                    try {
                        proxyQueue.put(proxy);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            index++;
        }
    }

    private boolean exists(Proxy proxy) {
        return proxyQueue.stream().anyMatch(p -> p.getId() == proxy.getId());
    }


    @Override
    public void shutdown() {
        shutdownCalled = true;
        executor.shutdownNow();
        clearResources();
    }

    @Override
    protected void returnToPool(Proxy proxy) {
        if (proxy.isAvailable()) {
            executor.execute(new ProxyReturner(proxyQueue, proxy));
        }
    }

    @Override
    protected void remove(Proxy proxy) {
        proxy.setUnavailable();
        logger.info("remove proxy ID {}, URL : {}", proxy.getId(), proxy.getIp());
        // 更新到数据库
        proxyDao.updateProxyPriority(proxy);
    }

    private void clearResources() {
        // 更新到数据库
        proxyDao.batchUpdateProxyPriority(proxyQueue);
        proxyQueue.clear();
    }

    @Override
    public void open() {
        shutdownCalled = false;
    }

    private class ProxyReturner implements Runnable {
        private BlockingQueue<Proxy> proxyQueue;
        private Proxy proxy;

        private ProxyReturner(BlockingQueue<Proxy> proxyQueue, Proxy proxy) {
            this.proxy = proxy;
            this.proxyQueue = proxyQueue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    this.proxyQueue.put(proxy);
                    break;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}
